use super::{
    Attr, BoxedMemCache, MemCache, MpscReceiver, RawTask, Runnable, Signo, Task, TaskQueue,
    TaskRef, ThreadContext, WorkerSender,
};
use crate::event::{event_list_new, Event, Scheduler, POLLIN};
use crate::thread;
use crate::Error;
use core::future::Future;
use core::mem::MaybeUninit;
use core::ptr::NonNull;
use hioff::container_of_mut;
use hipool::{Boxed, MemPool, NullAlloc};

#[allow(dead_code)]
#[repr(C)]
pub(crate) struct Worker {
    task_event: Event,
    queue: NonNull<[TaskQueue]>,
    sched_cnt: u64,
    sender: WorkerSender,
    mpsc_fd_event: Event,
    mpsc_recv: MpscReceiver,
    cache: BoxedMemCache,
    ctx: Option<NonNull<ThreadContext>>,
    current_task: Option<NonNull<RawTask>>,
    exit_cnt: u64,
    group: u8,
    id: u16,
}

unsafe impl Send for Worker {}

type BoxedWorker = Boxed<'static, Worker, NullAlloc>;

impl Worker {
    pub(crate) fn new_in(
        pool: &'static MemPool,
        group: u8,
        id: u16,
        mpsc_recv: MpscReceiver,
        queue: NonNull<[TaskQueue]>,
    ) -> Result<BoxedWorker, Error> {
        Boxed::new_in(
            pool,
            Self {
                task_event: Event::new(Self::task_handle),
                queue,
                sender: WorkerSender::new(id),
                mpsc_fd_event: Event::new(Self::mpsc_fd_handle),
                mpsc_recv,
                cache: MemCache::new_in(pool, group, id)?,
                ctx: None,
                current_task: None,
                sched_cnt: 0,
                exit_cnt: 0,
                group,
                id,
            },
        )
        .map(|boxed| boxed.into())
    }

    pub(crate) fn spawn<T: Future>(
        &mut self,
        future: T,
        attr: &Attr,
    ) -> Result<TaskRef, Error> {
        let mut task = self.task_from(future, attr)?;
        let queue = unsafe { self.queue.as_ref() };
        if attr.hash == 0 {
            let sched = unsafe { self.ctx.unwrap().as_mut() }.sched();
            self.sender.send(queue, task.clone(), sched);
        } else {
            let id = (attr.hash % queue.len()) as u16;
            task.status.set_local(id);
            if id != self.id {
                queue[id as usize].push(task.clone());
            } else {
                self.sched_local(task.clone());
            }
        }
        Ok(task)
    }

    pub(crate) fn spawn_local<T: Future>(
        &mut self,
        future: T,
        attr: &Attr,
    ) -> Result<TaskRef, Error> {
        let mut task = self.task_from(future, attr)?;
        task.status.set_local(self.id);
        self.sched_local(task.clone());
        Ok(task)
    }

    pub(crate) fn wake(&mut self, task: TaskRef, local: Option<u16>) {
        let queue = unsafe { self.queue.as_ref() };
        // 优先处理最常见场景. rust未提供分支预测功能
        #[allow(clippy::unnecessary_unwrap)]
        if local.is_none() || local.unwrap() == self.id {
            self.sched_local(task);
        } else {
            queue[local.unwrap() as usize].push(task);
        }
    }

    pub(crate) fn group_id(&self) -> u8 {
        self.group
    }

    pub(crate) fn worker_id(&self) -> u16 {
        self.id
    }

    pub(crate) fn cache(&self) -> &MemCache {
        &self.cache
    }

    pub(crate) fn current_task(&mut self) -> NonNull<RawTask> {
        self.current_task.unwrap()
    }

    pub(crate) fn set_current_task(&mut self, current: Option<NonNull<RawTask>>) {
        self.current_task = current;
        if self.current_task.is_none() {
            self.exit_cnt += 1;
        }
    }
}

impl Runnable for Worker {
    fn active(&mut self, ctx: &mut ThreadContext) -> Result<(), Error> {
        let data = self as *const _ as *const ();
        self.ctx = Some(NonNull::from(&*ctx));
        let sched = ctx.sched();

        let q_fd = unsafe { self.queue.as_ref() }[self.id as usize].fd();
        unsafe { sched.add_fd_event(&self.task_event, POLLIN, q_fd)? };
        unsafe { sched.add_fd_event(
            &self.mpsc_fd_event,
            POLLIN,
            self.mpsc_recv.read_fd_event().0,
        ) }
        .map_err(|e| {
            let _ = unsafe { sched.del_fd_event(q_fd) };
            e
        })?;
        sched.set_private_data(data);
        current_worker_set(Some(self));
        Ok(())
    }
}

impl Worker {
    #[cfg(feature = "task_mem_cache")]
    #[inline(always)]
    fn task_from<T: Future>(&mut self, future: T, attr: &Attr) -> Result<TaskRef, Error> {
        Task::new_in(&*self.cache, future, attr)
    }

    #[cfg(not(feature = "task_mem_cache"))]
    #[inline(always)]
    fn task_from<T: Future>(&mut self, future: T, attr: &Attr) -> Result<TaskRef, Error> {
        Task::new(future, attr)
    }

    fn sched_local(&mut self, task: TaskRef) {
        let sched = unsafe { self.ctx.unwrap().as_mut() }.sched();
        task.sched_waked(sched);
        self.sched_cnt += 1;
    }

    fn task_handle(e: &Event, _events: u32, sched: &mut Scheduler) {
        let this = unsafe { container_of_mut!(e, Self, task_event) };
        let queue = unsafe { this.queue.as_ref() };
        let mut tasks = queue[this.id as usize].pop();
        let mut hi_events = event_list_new();
        let mut lo_events = event_list_new();
        while !tasks.is_null() {
            let task = unsafe { TaskRef::from(tasks) };
            tasks = unsafe { task.node.next() };
            if task.status.priority() == 0 {
                task.add_waked(&mut lo_events);
            } else {
                task.add_waked(&mut hi_events);
            }
            this.sched_cnt += 1;
        }
        unsafe { sched.add_event_list(&mut lo_events, 0) };
        unsafe { sched.add_event_list(&mut hi_events, 1) };
    }

    fn mpsc_fd_handle(e: &Event, _events: u32, _sched: &mut Scheduler) {
        let this = unsafe { container_of_mut!(e, Self, mpsc_fd_event) };
        let mut signals = MaybeUninit::<[MaybeUninit<Signo>; RECV_BUF_MAX]>::uninit();
        let signals = unsafe { signals.assume_init_mut() };
        if let Some(cnt) = this.mpsc_recv.try_recv_slice(signals) {
            for sig in &mut signals[..cnt] {
                match unsafe { sig.assume_init_read() } {
                    Signo::SIG_CLEAR_CACHE => {
                        this.cache.clean();
                    }
                    Signo::SIG_STOP => {
                        let thr = unsafe { this.ctx.unwrap().as_mut() };
                        thr.stop();
                    }
                    Signo::SIG_NAME(name) => {
                        if !name.is_empty() {
                            this.set_thread_name(name);
                        } else {
                            this.set_thread_name("runtime");
                        }
                    }
                }
            }
        }
    }

    fn set_thread_name(&self, name: &'static str) {
        let mut thr_name = [0_i8; 17]; //最多16个字符
        unsafe {
            libc::snprintf(
                thr_name.as_mut_ptr(),
                core::mem::size_of_val(&thr_name),
                b"%.*s_%d_%d\0".as_ptr().cast::<i8>(),
                name.len() as i32,
                name.as_ptr(),
                self.group as i32,
                self.id as i32,
            );
            let tid = libc::pthread_self();
            let _ = libc::pthread_setname_np(tid, thr_name.as_ptr());
        }
    }
}

static mut CURRENT_WORKER: MaybeUninit<thread::TssData> = MaybeUninit::uninit();

unsafe fn tssdata_init() {
    unsafe {
        CURRENT_WORKER.write(thread::TssData::new().unwrap());
    }
}

hictor::ctor!(tssdata_init);

pub(crate) fn current_worker() -> Option<&'static mut Worker> {
    let addr = unsafe { CURRENT_WORKER.assume_init_ref() }.get();
    if addr > 0 {
        Some(unsafe { &mut *(addr as *mut Worker) })
    } else {
        None
    }
}

pub(crate) fn current_worker_set(worker: Option<&mut Worker>) {
    let current = unsafe { CURRENT_WORKER.assume_init_ref() };
    if let Some(worker) = worker {
        current.set(worker as *const _ as usize);
    } else {
        current.set(0);
    }
}

pub(crate) unsafe fn worker_from(sched: &mut Scheduler) -> &mut Worker {
    unsafe { &mut *sched.private_data().cast_mut().cast::<Worker>() }
}

const RECV_BUF_MAX: usize = 64;
